/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.paimon.globalindex;

import org.apache.paimon.fs.FileIO;
import org.apache.paimon.index.GlobalIndexMeta;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Range;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.IntFunction;
import java.util.stream.Collectors;

import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkNotNull;

/** Scanner for shard-based global indexes. */
public class RowRangeGlobalIndexScanner implements Closeable {

    private final Options options;
    private final GlobalIndexEvaluator globalIndexEvaluator;

    public RowRangeGlobalIndexScanner(
            FileStoreTable fileStoreTable,
            long rowRangeStart,
            long rowRangeEnd,
            List<IndexManifestEntry> entries) {
        this.options = fileStoreTable.coreOptions().toConfiguration();
        for (IndexManifestEntry entry : entries) {
            GlobalIndexMeta meta = entry.indexFile().globalIndexMeta();
            checkArgument(
                    meta != null
                            && Range.intersect(
                                    rowRangeStart,
                                    rowRangeEnd,
                                    meta.rowRangeStart(),
                                    meta.rowRangeEnd()),
                    "All index files must have an intersection with row range ["
                            + rowRangeStart
                            + ", "
                            + rowRangeEnd
                            + ")");
        }

        FileIO fileIO = fileStoreTable.fileIO();
        GlobalIndexFileReadWrite indexFileReadWrite =
                new GlobalIndexFileReadWrite(
                        fileIO, fileStoreTable.store().pathFactory().globalIndexFileFactory());

        Map<Integer, Map<String, List<IndexFileMeta>>> indexMetas = new HashMap<>();
        for (IndexManifestEntry entry : entries) {
            GlobalIndexMeta meta = entry.indexFile().globalIndexMeta();
            checkArgument(meta != null, "Global index meta must not be null");
            int fieldId = meta.indexFieldId();
            String indexType = entry.indexFile().indexType();
            indexMetas
                    .computeIfAbsent(fieldId, k -> new HashMap<>())
                    .computeIfAbsent(indexType, k -> new ArrayList<>())
                    .add(entry.indexFile());
        }

        RowType rowType = fileStoreTable.rowType();

        IntFunction<Collection<GlobalIndexReader>> readersFunction =
                fieldId ->
                        createReaders(
                                indexFileReadWrite,
                                indexMetas.get(fieldId),
                                rowType.getField(fieldId).type());
        this.globalIndexEvaluator = new GlobalIndexEvaluator(rowType, readersFunction);
    }

    public Optional<GlobalIndexResult> scan(Predicate predicate) {
        return globalIndexEvaluator.evaluate(predicate);
    }

    private Collection<GlobalIndexReader> createReaders(
            GlobalIndexFileReadWrite indexFileReadWrite,
            Map<String, List<IndexFileMeta>> indexMetas,
            DataType fieldType) {
        if (indexMetas == null) {
            return Collections.emptyList();
        }

        Set<GlobalIndexReader> readers = new HashSet<>();
        try {
            for (Map.Entry<String, List<IndexFileMeta>> entry : indexMetas.entrySet()) {
                String indexType = entry.getKey();
                List<IndexFileMeta> metas = entry.getValue();
                GlobalIndexerFactory globalIndexerFactory =
                        GlobalIndexerFactoryUtils.load(indexType);
                GlobalIndexer globalIndexer = globalIndexerFactory.create(fieldType, options);
                List<GlobalIndexIOMeta> globalMetas =
                        metas.stream().map(this::toGlobalMeta).collect(Collectors.toList());
                readers.add(globalIndexer.createReader(indexFileReadWrite, globalMetas));
            }
        } catch (IOException e) {
            throw new RuntimeException("Failed to create global index reader", e);
        }

        return readers;
    }

    private GlobalIndexIOMeta toGlobalMeta(IndexFileMeta meta) {
        GlobalIndexMeta globalIndex = meta.globalIndexMeta();
        checkNotNull(globalIndex);
        return new GlobalIndexIOMeta(
                meta.fileName(),
                meta.fileSize(),
                new Range(globalIndex.rowRangeStart(), globalIndex.rowRangeEnd()),
                globalIndex.indexMeta());
    }

    @Override
    public void close() throws IOException {
        globalIndexEvaluator.close();
    }
}
